-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: add LazyPartitioned mode for hash join to reduce RepartitionExec overhead #19808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add LazyPartitioned mode for hash join to reduce RepartitionExec overhead #19808
Conversation
e361a74 to
420e207
Compare
|
run benchmark tpch tpcds |
| PartitionMode::LazyPartitioned => { | ||
| // LazyPartitioned mode: build side is NOT repartitioned (we read all | ||
| // partitions and filter locally), but probe side IS hash-partitioned. | ||
| let right_expr = self.on.iter().map(|(_, r)| Arc::clone(r)).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- we can probably implement it for both sides (
vec![ Distribution::UnspecifiedDistribution, Distribution::UnspecifiedDistribution, ]to also save theRepartitionExecon the right side - I think we should strive to replace
PartitionMode::Partitionedwith the new implementation (just a faster version). - We should compute the hashes and indices for each partition only once instead of for each partition (to avoid making many partitions slow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok ill update it.....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it is even better to start with (only avoid repartitioning the probe side, I meant this originally but put build side in the description):
vec![ Distribution::Hash(left_expr), Distribution::UnspecifiedDistribution, ]
as it probably will be needed any way to redistribute/prepare the left side and the probe side will be often the larger one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok sir ill update that also.....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I created an (AI-based) variant on the idea in #19812 and put some bench results in the description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for sharing this. I’ll review the AI-based variant and the benchmarks you added and see how it compares with this approach.
…c overhead This commit adds a new PartitionMode::LazyPartitioned that avoids the full build-side RepartitionExec when executing partitioned hash joins. Instead of pre-repartitioning all columns of the build table, rows are filtered lazily during hash table construction using hash(join_keys) % partition_count. Key changes: - Add LazyPartitioned variant to PartitionMode enum - Build side requests UnspecifiedDistribution (merged, no repartition) - Probe side still requests HashPartitioned distribution - Add filter_batch_by_partition() to filter build rows per partition - Update collect_left_input to accept optional partition filter - Add protobuf serialization support for new mode - Update optimizer to handle LazyPartitioned in key reordering This optimization is beneficial for wide build tables where copying all columns in RepartitionExec is expensive. Closes apache#19789
08e8ef5 to
6d62650
Compare
Which issue does this PR close?
Closes #19789
Rationale for this change
When executing a partitioned hash join with a wide build table (many columns), the current
PartitionMode::Partitionedapproach adds aRepartitionExecthat copies all columns of the build table viatake_arrays. This is expensive when only the join key columns are needed for partitioning.What changes are included in this PR?
This PR introduces a new
PartitionMode::LazyPartitionedthat avoids the full build-sideRepartitionExec. Instead:UnspecifiedDistribution(no repartitioning). All partitions are merged viaCoalescePartitionsExec.HashPartitioneddistribution (repartitioned as before).hash(join_keys) % partition_count == current_partition, ensuring each partition only builds its relevant subset.Key changes:
joins/mod.rsLazyPartitionedvariant toPartitionModeenumjoins/hash_join/exec.rsPartitionFilterstruct andfilter_batch_by_partition()function; updatedrequired_input_distribution()andexecute()joins/hash_join/stream.rsjoins/hash_join/shared_bounds.rsSharedBuildAccumulatorhandlingphysical-optimizer/enforce_distribution.rsLazyPartitionedto key reordering logicphysical-optimizer/join_selection.rsproto/datafusion.protoLAZY_PARTITIONED = 3proto/src/physical_plan/mod.rsAre these changes tested?
Yes. All existing tests pass:
roundtrip_hash_join)Are there any user-facing changes?
No breaking changes. This adds a new
PartitionMode::LazyPartitionedoption that can be explicitly selected for hash joins where the build table is wide but the join key is narrow. The existingPartitioned,CollectLeft, andAutomodes remain unchanged.Performance Impact
For wide build tables,
LazyPartitionedavoids copying non-join-key columns during repartitioning, reducing memory allocations and CPU overhead. The trade-off is that each partition now scans all build rows (but only retains those matching its partition).